{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sc = SparkContext(conf=SparkConf())\n",
    "spark = SparkSession(sparkContext=sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Example data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+---+---+---+---+\n",
      "| x1|    x2| x3| x4| y1| y2|\n",
      "+---+------+---+---+---+---+\n",
      "|  a| apple|  1|2.4|  1|yes|\n",
      "|  a|orange|  1|2.5|  0| no|\n",
      "|  b|orange|  2|3.5|  1| no|\n",
      "|  b|orange|  2|1.4|  0|yes|\n",
      "|  b| peach|  2|2.1|  0|yes|\n",
      "|  c| peach|  4|1.5|  1|yes|\n",
      "+---+------+---+---+---+---+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "import pandas as pd\n",
    "pdf = pd.DataFrame({\n",
    "        'x1': ['a','a','b','b', 'b', 'c'],\n",
    "        'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],\n",
    "        'x3': [1, 1, 2, 2, 2, 4],\n",
    "        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],\n",
    "        'y1': [1, 0, 1, 0, 0, 1],\n",
    "        'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']\n",
    "    })\n",
    "df = spark.createDataFrame(pdf)\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pipeline\n",
    "\n",
    "Pipeline is a sequence of stages which consists of **Estimators** and/or **Transformers**. **Estimator** has **`fit`** method and **Transformer** has **`transform`** method. Therefore, we can say, **a pipeline is a sequence of `fit` and `transform` methods**. When it is a **`fit`** method, it applies to the input data and turns into a **`transform`** method. Then the **`transform`** method applies to the **fitted** data and output **transformed** data. **The transformed data output from previous stage has to be an acceptable input to the next stage's fit/transform method**.\n",
    "\n",
    "![spark pipeline](images/spark-pipeline.png)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.feature import StringIndexer, OneHotEncoder"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Example\n",
    "\n",
    "We are going to use pipeline to `StringIndex` columns x1, x2, y1, and y2. Then we `OneHotEncode` the resulting `StringIdexed` columns."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[StringIndexer_4822a1cd65667da67acd,\n",
       " StringIndexer_458889022f08259328b3,\n",
       " StringIndexer_4cf4afdb4c495ecc227f,\n",
       " StringIndexer_4460a02263a04635288b]"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stringindex_stages = [StringIndexer(inputCol=c, outputCol='idx_' + c) for c in ['x1', 'x2', 'y1', 'y2']]\n",
    "stringindex_stages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[OneHotEncoder_4435bac1095e2bed0952,\n",
       " OneHotEncoder_480db42beb32e2d6cb83,\n",
       " OneHotEncoder_498cbd54623097da2b6c,\n",
       " OneHotEncoder_460c95da4c6185ac4fe2]"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "onehotencode_stages = [OneHotEncoder(inputCol='idx_' + c, outputCol='ohe_' + c) for c in ['x1', 'x2', 'y1', 'y2']]\n",
    "onehotencode_stages"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that the **outputCol label** in StringIndex stages is the same as the **inputCol label** in the OneHotEncode stages."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Elements in the stage list"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[pyspark.ml.feature.StringIndexer,\n",
       " pyspark.ml.feature.StringIndexer,\n",
       " pyspark.ml.feature.StringIndexer,\n",
       " pyspark.ml.feature.StringIndexer,\n",
       " pyspark.ml.feature.OneHotEncoder,\n",
       " pyspark.ml.feature.OneHotEncoder,\n",
       " pyspark.ml.feature.OneHotEncoder,\n",
       " pyspark.ml.feature.OneHotEncoder]"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "all_stages = stringindex_stages + onehotencode_stages\n",
    "[type(x) for x in all_stages]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the above list, **`pyspark.ml.feature.StringIndexer`** is an **Estimator**(has a fit method) and **`pyspark.ml.feature.OneHotEncoder`** is a **transformer**(has a transform method)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Build and run pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+\n",
      "| x1|    x2| x3| x4| y1| y2|idx_x1|idx_x2|idx_y1|idx_y2|       ohe_x1|       ohe_x2|       ohe_y1|       ohe_y2|\n",
      "+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+\n",
      "|  a| apple|  1|2.4|  1|yes|   1.0|   2.0|   1.0|   0.0|(2,[1],[1.0])|    (2,[],[])|    (1,[],[])|(1,[0],[1.0])|\n",
      "|  a|orange|  1|2.5|  0| no|   1.0|   0.0|   0.0|   1.0|(2,[1],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|    (1,[],[])|\n",
      "|  b|orange|  2|3.5|  1| no|   0.0|   0.0|   1.0|   1.0|(2,[0],[1.0])|(2,[0],[1.0])|    (1,[],[])|    (1,[],[])|\n",
      "|  b|orange|  2|1.4|  0|yes|   0.0|   0.0|   0.0|   0.0|(2,[0],[1.0])|(2,[0],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|\n",
      "|  b| peach|  2|2.1|  0|yes|   0.0|   1.0|   0.0|   0.0|(2,[0],[1.0])|(2,[1],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|\n",
      "|  c| peach|  4|1.5|  1|yes|   2.0|   1.0|   1.0|   0.0|    (2,[],[])|(2,[1],[1.0])|    (1,[],[])|(1,[0],[1.0])|\n",
      "+---+------+---+---+---+---+------+------+------+------+-------------+-------------+-------------+-------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "Pipeline(stages=all_stages).fit(df).transform(df).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Reorder pipeline stages\n",
    "\n",
    "In the example above, our strategy is to **StringIndex** all four columns and then **OneHotEncode** them. Since each **OneHotEncode** stage only depends on the output of their corresponding **StringIndex** stage, our stages list could be **`[stringindexer on x1, onehotencoder on x1, stringindexer on x2, onehotencoder on x2, stringindexer on y1, onehotencoder on y1, stringindexer on y2, onehotencoder on y2]`**."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Old stages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[StringIndexer_4822a1cd65667da67acd,\n",
       " StringIndexer_458889022f08259328b3,\n",
       " StringIndexer_4cf4afdb4c495ecc227f,\n",
       " StringIndexer_4460a02263a04635288b,\n",
       " OneHotEncoder_4435bac1095e2bed0952,\n",
       " OneHotEncoder_480db42beb32e2d6cb83,\n",
       " OneHotEncoder_498cbd54623097da2b6c,\n",
       " OneHotEncoder_460c95da4c6185ac4fe2]"
      ]
     },
     "execution_count": 33,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "all_stages"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### New stages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[StringIndexer_4822a1cd65667da67acd,\n",
       " OneHotEncoder_4435bac1095e2bed0952,\n",
       " StringIndexer_458889022f08259328b3,\n",
       " OneHotEncoder_480db42beb32e2d6cb83,\n",
       " StringIndexer_4cf4afdb4c495ecc227f,\n",
       " OneHotEncoder_498cbd54623097da2b6c,\n",
       " StringIndexer_4460a02263a04635288b,\n",
       " OneHotEncoder_460c95da4c6185ac4fe2]"
      ]
     },
     "execution_count": 35,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "new_all_stages = [all_stages[x] for x in [0,4,1,5,2,6,3,7]]\n",
    "new_all_stages"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Build and run pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+\n",
      "| x1|    x2| x3| x4| y1| y2|idx_x1|       ohe_x1|idx_x2|       ohe_x2|idx_y1|       ohe_y1|idx_y2|       ohe_y2|\n",
      "+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+\n",
      "|  a| apple|  1|2.4|  1|yes|   1.0|(2,[1],[1.0])|   2.0|    (2,[],[])|   1.0|    (1,[],[])|   0.0|(1,[0],[1.0])|\n",
      "|  a|orange|  1|2.5|  0| no|   1.0|(2,[1],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   1.0|    (1,[],[])|\n",
      "|  b|orange|  2|3.5|  1| no|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   1.0|    (1,[],[])|   1.0|    (1,[],[])|\n",
      "|  b|orange|  2|1.4|  0|yes|   0.0|(2,[0],[1.0])|   0.0|(2,[0],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|\n",
      "|  b| peach|  2|2.1|  0|yes|   0.0|(2,[0],[1.0])|   1.0|(2,[1],[1.0])|   0.0|(1,[0],[1.0])|   0.0|(1,[0],[1.0])|\n",
      "|  c| peach|  4|1.5|  1|yes|   2.0|    (2,[],[])|   1.0|(2,[1],[1.0])|   1.0|    (1,[],[])|   0.0|(1,[0],[1.0])|\n",
      "+---+------+---+---+---+---+------+-------------+------+-------------+------+-------------+------+-------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "Pipeline(stages=new_all_stages).fit(df).transform(df).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
